In [1]:
import numpy as np
import pandas as pd
from scipy.optimize import minimize
from sqlalchemy import create_engine, text
from typing import Optional
import plotly.graph_objects as go

# Set the display options
pd.set_option("display.max_colwidth", None)
pd.set_option("display.max_rows", None)
pd.set_option("display.max_columns", None)
In [2]:
# define the PortfolioOptimization class with core-satellite strategy
class PortfolioOptimization:
    def __init__(self, data: pd.DataFrame, risk_free_rate: float = 0.0, weight_bounds=(0.0, 1.0)):
        self.assets = data.columns
        self.data = data
        self.returns = data.pct_change().dropna()
        self.expected_returns = self.returns.mean().dropna() * 252  # annualized
        self.cov_matrix = self.returns.cov().dropna() * 252
        self.risk_free_rate = risk_free_rate
        self.weight_bounds = weight_bounds
        self.number_of_assets = len(data.columns)

    def evaluate_target_return(self, portfolio: dict, cpi: float = 0.03) -> str:
        """Check if portfolio return meets CPI + 6% target."""
        target = cpi + 0.06
        actual = portfolio["expected_return"]
    
        if actual >= target:
            result = f"✅ Portfolio meets the target return of {target:.2%} (Actual: {actual:.2%})"
        else:
            result = f"❌ Portfolio does NOT meet the target return of {target:.2%} (Actual: {actual:.2%})"
        return result

    def _portfolio_variance(self, weights: np.ndarray) -> float:
        return weights.T @ self.cov_matrix.values @ weights

    def _portfolio_return(self, weights: np.ndarray) -> float:
        return np.sum(weights * self.expected_returns.values)

    def _negative_sharpe_ratio(self, weights: np.ndarray) -> float:
        ret = self._portfolio_return(weights)
        std = np.sqrt(self._portfolio_variance(weights))
        return -(ret - self.risk_free_rate) / std if std != 0 else np.inf

    def optimize_with_core_satellite(
        self,
        core_etfs: list,
        satellite_etfs: list,
        target_return: float = None,
        maximize_sharpe: bool = True
    ):
        n_assets = len(self.assets)
        initial_weights = np.ones(n_assets) / n_assets

        bounds = []
        core_mask = []
        satellite_mask = []
        for asset in self.assets:
            if asset in core_etfs:
                bounds.append((0.10, 0.50))  # each core: 10%-50%
                core_mask.append(1)
                satellite_mask.append(0)
            elif asset in satellite_etfs:
                bounds.append((0.0, 0.10))   # each satellite: 0%-10%
                core_mask.append(0)
                satellite_mask.append(1)
            else:
                raise ValueError(f"Asset {asset} not found in core or satellite list")

        constraints = [
            {"type": "eq", "fun": lambda w: np.sum(w) - 1},  # sum of weights = 1
            {"type": "ineq", "fun": lambda w: np.dot(w, core_mask) - 0.65},  # core >= 65%
            {"type": "ineq", "fun": lambda w: 0.85 - np.dot(w, core_mask)},  # core <= 85%
            {"type": "ineq", "fun": lambda w: np.dot(w, satellite_mask) - 0.15},  # satellite >= 15%
            {"type": "ineq", "fun": lambda w: 0.35 - np.dot(w, satellite_mask)},  # satellite <= 35%
        ]

        if target_return is not None:
            constraints.append({
                "type": "eq",
                "fun": lambda w: self._portfolio_return(w) - target_return
            })

        result = minimize(
            self._negative_sharpe_ratio if maximize_sharpe else self._portfolio_variance,
            initial_weights,
            method="SLSQP",
            bounds=bounds,
            constraints=constraints
        )

        weights = result.x
        return {
            "weights": dict(zip(self.assets, weights)),
            "expected_return": self._portfolio_return(weights),
            "expected_risk": np.sqrt(self._portfolio_variance(weights)),
            "sharpe_ratio": -(result.fun) if maximize_sharpe else (
                (self._portfolio_return(weights) - self.risk_free_rate) /
                np.sqrt(self._portfolio_variance(weights))
            )
        }
   
### 定义模特卡洛模拟 
    
    def monte_carlo_simulation(self, number_of_portfolio: int = 5000): #默认生成 5000 个投资组合
        results = []
        for _ in range(number_of_portfolio): #用一个 for 循环运行 5000 次(或你设定的次数),每次生成一个新的组合。
            weights = np.random.dirichlet(np.ones(self.number_of_assets), size=1).flatten() #来生成一组加起来为 1 的非负数,作为资产权重, one()是说每个资产都给同样的权重倾向(均匀分布)。最后把二维数组变成一维向量。

            portfolio_return = self._portfolio_return(weights)
            portfolio_risk = np.sqrt(self._portfolio_variance(weights))
            sharpe_ratio = (
                (portfolio_return - self.risk_free_rate) / portfolio_risk
                if portfolio_risk != 0
                else None
            )

           #这一步是把当前组合的权重、预期收益、风险、夏普比率打包成一个字典,添加到 results 列表中。
            results.append(
                {
                    "weights": dict(zip(self.assets, weights)), #zip函数把每个资产名和它的权重配对
                    "expected_return": portfolio_return,
                    "expected_risk": portfolio_risk,
                    "sharpe_ratio": sharpe_ratio,
                }
            )

        return pd.DataFrame(results)

    
### 定义组合优化
    
    def optimize_portfolio(
        self,
        target_return: Optional[float] = None,
        target_risk: Optional[float] = None,
        maximize_sharpe: bool = False,
    ):
        """Optimize portfolio using scipy.optimize.minimize."""
        if target_risk is not None and target_return is not None:
            raise ValueError("Specify either target_return or target_risk, not both.")
        
        initial_weights = np.ones(self.number_of_assets) / self.number_of_assets

        bounds = self.weight_bounds
        
        constraints = [{"type": "eq", "fun": lambda x: np.sum(x) - 1}]

        if maximize_sharpe:
            objective = self._negative_sharpe_ratio
        elif target_risk is not None:
            objective = self._negative_sharpe_ratio
            constraints.append(
                {
                    "type": "eq",
                    "fun": lambda x: target_risk**2 - self._portfolio_variance(x),
                }
            )
        else:
            objective = self._portfolio_variance
            if target_return is not None:
                constraints.append(
                    {
                        "type": "eq",
                        "fun": lambda x: target_return - self._portfolio_return(x),
                    }
                )

        result = minimize(
            objective,
            initial_weights,
            method="SLSQP",
            bounds=bounds,
            constraints=constraints,
        )

        portfolio_return = self._portfolio_return(result.x)
        portfolio_risk = np.sqrt(self._portfolio_variance(result.x))
        sharpe_ratio = (
            (portfolio_return - self.risk_free_rate) / portfolio_risk
            if portfolio_risk != 0
            else 0
        )

        return {
            "weights": dict(zip(self.assets, result.x)),
            "expected_return": portfolio_return,
            "expected_risk": portfolio_risk,
            "sharpe_ratio": sharpe_ratio,
        }

    def print_portfolio(self, portfolio: dict):
        """
        Print the portfolio details in a neat format without np.float64.
        """
        print("Weights:")
        for asset, weight in portfolio["weights"].items():
            print(f"  {asset}: {weight:.4f}")
        
        print(f"Expected Return: {portfolio['expected_return']:.4f}")
        print(f"Expected Risk: {portfolio['expected_risk']:.4f}")
        print(f"Sharpe Ratio: {portfolio['sharpe_ratio']:.4f}")

    def plot_simulation(self, temp: pd.DataFrame):
        # Plot simulated portfolio
        fig = go.Figure(
            data=go.Scatter(
                x=temp["expected_risk"],
                y=temp["expected_return"],
                mode="markers",
                marker=dict(
                    color=temp["sharpe_ratio"],
                    colorbar=dict(
                        title="Sharpe Ratio",
                        tickvals=[temp["sharpe_ratio"].min(), temp["sharpe_ratio"].max()],
                        ticktext=[f"{temp['sharpe_ratio'].min():.2f}", f"{temp['sharpe_ratio'].max():.2f}"],
                    ),
                    symbol="cross",
                ),
                name="Simulated Portfolio",
                text=temp["sharpe_ratio"],
                hovertemplate="Sharpe Ratio: %{text}<br>Expected Volatility: %{x}<br>Expected Return: %{y}<extra></extra>",
                showlegend=True,
            )
        )

        # Plot max Sharpe ratio
        max_sharpe_idx = temp.sharpe_ratio.idxmax()
        fig.add_trace(
            go.Scatter(
                x=[temp.iloc[max_sharpe_idx]["expected_risk"]],
                y=[temp.iloc[max_sharpe_idx]["expected_return"]],
                mode="markers",
                marker=dict(color="RoyalBlue", size=20, symbol="star"),
                name="Max Sharpe",
                text="Max Sharpe Portfolio",
                hovertemplate="Max Sharpe Portfolio<br>Expected Volatility: %{x}<br>Expected Return: %{y}<extra></extra>",
            )
        )

        # Update layout for title and labels
        fig.update_layout(
            title="Monte Carlo Simulated Portfolio",
            xaxis_title="Expected Volatility",
            yaxis_title="Expected Return",
            width=800,
            height=600,
            showlegend=False,
        )

        # Show spikes on both axes
        fig.update_xaxes(showspikes=True)
        fig.update_yaxes(showspikes=True)

        # Show the plot
        fig.show()

    def plot_efficient_frontier(self, opt_var, opt_sharpe, num_portfolios=100):
        """
        Plot the efficient frontier with max Sharpe portfolio and min variance portfolio using Plotly go objects.
        """
        # Results of efficient frontier
        results = []
        target_risks = np.linspace(0.03, 0.15, num=num_portfolios)  # Risk range for efficient frontier

        for target_risk in target_risks:
            result = self.optimize_portfolio(target_risk=target_risk)
            results.append(
                {
                    "expected_return": result["expected_return"],
                    "expected_risk": result["expected_risk"],
                    "sharpe_ratio": result["sharpe_ratio"],
                }
            )

        # Create a DataFrame of results
        frontier = pd.DataFrame(results)
        
        # Plot efficient frontier using lines
        fig = go.Figure(
            data=go.Scatter(
                x=frontier["expected_risk"], 
                y=frontier["expected_return"],
                # mode="lines",
                mode='markers', 
                marker=dict(
                    color=frontier["sharpe_ratio"],
                    colorbar=dict(
                        title="Sharpe Ratio",
                        tickvals=[frontier["sharpe_ratio"].min(), frontier["sharpe_ratio"].max()],
                        ticktext=[f"{frontier['sharpe_ratio'].min():.2f}", f"{frontier['sharpe_ratio'].max():.2f}"],
                    ),
                    symbol='cross'),
                name="Efficient Frontier",
                line=dict(color="green"),
                hovertemplate="Expected Volatility: %{x}<br>Expected Return: %{y}<extra></extra>",
            )
        )

        # Plot max Sharpe ratio
        fig.add_trace(
            go.Scatter(
                x=[opt_sharpe["expected_risk"]],
                y=[opt_sharpe["expected_return"]],
                mode="markers",
                marker=dict(color="red", size=20, symbol="star"),
                name="Max Sharpe",
                text="Max Sharpe Portfolio",
                hovertemplate="Max Sharpe Portfolio<br>Expected Volatility: %{x}<br>Expected Return: %{y}<extra></extra>",
            )
        )

        # Plot min variance portfolio
        fig.add_trace(
            go.Scatter(
                x=[opt_var["expected_risk"]],
                y=[opt_var["expected_return"]],
                mode="markers",
                marker=dict(color="green", size=20, symbol="star"),
                name="Min Variance",
                text="Min Variance Portfolio",
                hovertemplate="Min Variance Portfolio<br>Expected Volatility: %{x}<br>Expected Return: %{y}<extra></extra>",
            )
        )

        # Update layout for title and labels
        fig.update_layout(
            title="Efficient Frontier with Max Sharpe and Min Variance Portfolios",
            xaxis_title="Expected Volatility (%)",
            yaxis_title="Expected Return (%)",
            width=800,
            height=600,
            showlegend=False,
        )

        # Show spikes on both axes
        fig.update_xaxes(showspikes=True)
        fig.update_yaxes(showspikes=True)

        # Show the plot
        fig.show()
In [3]:
# Inport selected ETFs

aaa = pd.read_csv("aaa.csv", encoding="utf-8-sig")
aaa.columns = aaa.columns.str.strip() 
aaa = aaa[["Date", "NAV"]]  
aaa["Date"] = pd.to_datetime(aaa["Date"])
aaa.set_index("Date", inplace=True)
aaa.rename(columns={"NAV": "AAA"}, inplace=True)

asia = pd.read_csv("asia.csv", encoding="utf-8-sig")
asia.columns = asia.columns.str.strip()  
asia = asia[["Date", "NAV"]]  
asia["Date"] = pd.to_datetime(asia["Date"])
asia.set_index("Date", inplace=True)
asia.rename(columns={"NAV": "ASIA"}, inplace=True)

bnds = pd.read_csv("bnds.csv", encoding="utf-8-sig")
bnds.columns = bnds.columns.str.strip()  
bnds = bnds[["Date", "NAV"]]  
bnds["Date"] = pd.to_datetime(bnds["Date"])
bnds.set_index("Date", inplace=True)
bnds.rename(columns={"NAV": "BNDS"}, inplace=True)

smll = pd.read_csv("smll.csv", encoding="utf-8-sig")
smll.columns = smll.columns.str.strip()  
smll = smll[["Date", "NAV"]]  
smll["Date"] = pd.to_datetime(smll["Date"])
smll.set_index("Date", inplace=True)
smll.rename(columns={"NAV": "SMLL"}, inplace=True)

xmet = pd.read_csv("xmet.csv", encoding="utf-8-sig")
xmet.columns = xmet.columns.str.strip()  
xmet = xmet[["Date", "NAV"]]  
xmet["Date"] = pd.to_datetime(xmet["Date"])
xmet.set_index("Date", inplace=True)
xmet.rename(columns={"NAV": "XMET"}, inplace=True)

gcap = pd.read_csv("gcap.csv", encoding="utf-8-sig")
gcap.columns = gcap.columns.str.strip()  
gcap= gcap[["Date", "NAV"]] 
gcap["Date"] = pd.to_datetime(gcap["Date"], dayfirst=True)
gcap.set_index("Date", inplace=True)
gcap.rename(columns={"NAV": "GCAP"}, inplace=True)

emkt = pd.read_csv("emkt.csv", encoding="utf-8-sig")
emkt.columns = emkt.columns.str.strip()  
emkt = emkt[["Date", "NAV"]]  
emkt["Date"] = pd.to_datetime(emkt["Date"], dayfirst=True)
emkt.set_index("Date", inplace=True)
emkt.rename(columns={"NAV": "EMKT"}, inplace=True)

ebnd = pd.read_csv("ebnd.csv", encoding="utf-8-sig")
ebnd.columns = ebnd.columns.str.strip()  
ebnd = ebnd[["Date", "NAV"]]  
ebnd["Date"] = pd.to_datetime(ebnd["Date"], dayfirst=True)
ebnd.set_index("Date", inplace=True)
ebnd.rename(columns={"NAV": "EBND"}, inplace=True)

iwld = pd.read_csv("iwld.csv", encoding="utf-8-sig")
iwld.columns = iwld.columns.str.strip()  
iwld = iwld[["Date", "NAV"]]  
iwld["Date"] = pd.to_datetime(iwld["Date"], dayfirst=True)
iwld.set_index("Date", inplace=True)
iwld.rename(columns={"NAV": "IWLD"}, inplace=True)


# 合并所有 ETF 到一个 DataFrame
price_data = pd.concat([aaa, asia, bnds, smll, xmet, gcap, emkt, ebnd, iwld], axis=1).dropna()
# 确保索引是日期时间类型
price_data.index = pd.to_datetime(price_data.index)

# 筛选日期范围
filtered_data = price_data.loc["2023-10-31" : "2024-04-30"]

# 查看结果
filtered_data

# 定义 core 和 satellite ETF 名称
core_etfs = ["AAA", "ASIA", "IWLD", "EBND"]
satellite_etfs = ["SMLL", "XMET", "GCAP", "EMKT", "BNDS"]
all_etfs = core_etfs + satellite_etfs

# 设置每个 ETF 的权重限制
weight_bounds = []
for etf in all_etfs:
    if etf in core_etfs:
        weight_bounds.append((0.1, 0.5))
    else:
        weight_bounds.append((0.00001, 0.1))  # 大于 0%,不等于 0
In [4]:
optimizer = PortfolioOptimization(data=filtered_data, weight_bounds=weight_bounds)

# 执行优化(最大化夏普比率)
result = optimizer.optimize_with_core_satellite(
    core_etfs=core_etfs,
    satellite_etfs=satellite_etfs,
    maximize_sharpe=True
)

# 模拟和优化
mc = optimizer.monte_carlo_simulation()
ms = optimizer.optimize_portfolio(maximize_sharpe=True)
mv = optimizer.optimize_portfolio(target_return=0.25)
mr = optimizer.optimize_portfolio(target_risk=0.25)

# 打印结果
print(optimizer.evaluate_target_return(ms, cpi=0.03))
optimizer.print_portfolio(ms)
optimizer.plot_simulation(mc)
optimizer.plot_efficient_frontier(mv, ms, num_portfolios=300)
✅ Portfolio meets the target return of 9.00% (Actual: 17.35%)
Weights:
  AAA: 0.2511
  ASIA: 0.1000
  BNDS: 0.1000
  SMLL: 0.1000
  XMET: 0.0489
  GCAP: 0.1000
  EMKT: 0.1000
  EBND: 0.1000
  IWLD: 0.1000
Expected Return: 0.1735
Expected Risk: 0.0396
Sharpe Ratio: 4.3870
/Users/nickshang/anaconda3/envs/test/lib/python3.10/site-packages/scipy/optimize/_slsqp_py.py:437: RuntimeWarning: Values in x were outside bounds during a minimize step, clipping to bounds
  fx = wrapped_fun(x)
/Users/nickshang/anaconda3/envs/test/lib/python3.10/site-packages/scipy/optimize/_slsqp_py.py:441: RuntimeWarning: Values in x were outside bounds during a minimize step, clipping to bounds
  g = append(wrapped_grad(x), 0.0)
/Users/nickshang/anaconda3/envs/test/lib/python3.10/site-packages/scipy/optimize/_slsqp_py.py:495: RuntimeWarning: Values in x were outside bounds during a minimize step, clipping to bounds
  a_eq = vstack([con['jac'](x, *con['args'])
/Users/nickshang/anaconda3/envs/test/lib/python3.10/site-packages/scipy/optimize/_slsqp_py.py:437: RuntimeWarning:

Values in x were outside bounds during a minimize step, clipping to bounds

/Users/nickshang/anaconda3/envs/test/lib/python3.10/site-packages/scipy/optimize/_slsqp_py.py:441: RuntimeWarning:

Values in x were outside bounds during a minimize step, clipping to bounds

/Users/nickshang/anaconda3/envs/test/lib/python3.10/site-packages/scipy/optimize/_slsqp_py.py:495: RuntimeWarning:

Values in x were outside bounds during a minimize step, clipping to bounds

In [ ]:
 
In [ ]: